<?php
date_default_timezone_set('Asia/Shanghai');

require_once './vendor/autoload.php';
require_once './common/logic/JobLogic.php';

use Workerman\Worker;
use Workerman\Lib\Timer;
use GuzzleHttp\Client;
use GuzzleHttp\Pool;
use common\logic\JobLogic;
use Psr\Http\Message\ResponseInterface;
use GuzzleHttp\Exception\RequestException;
use common\tool\Log;
use Cron\CronExpression;

//$tcp_worker = new Worker("text://0.0.0.0:8899");

//开启一个tcp协议，不绑定任何一个端口
$tcp_worker = new Worker();

//启动数量
$tcp_worker->count = 1;

// 设置实例的名称
$tcp_worker->name = 'MajorbioCron';

//启动
$tcp_worker->onWorkerStart = function($worker)
{
    if($worker->id === 0)
    {
        $time_interval = 10;//60秒
        $timer_id = Timer::add(
            $time_interval,
            function()
            {
                //每 60 秒读取一次 tasks 中的内容，这个文件中是一个 json，循环此数组，检测并执行符合条件的任务
                $L_job = new JobLogic();
                $jobs = $L_job->readJobs();
                if( ! empty($jobs))
                {
                    $request_urls = [];
                    foreach($jobs as $job)
                    {
                        if($job['status'] == 'on' && CronExpression::factory(implode(' ', $job['cron']))->isDue()){
                            $request_urls[] = $job['command'];
                        }
                    }

                    if( ! empty($request_urls))
                    {
                        $client = new Client();

                        $request = function() use ($request_urls, $client) {
                            foreach($request_urls as $request_url) {
                                yield function() use ($request_url, $client){
                                    return $client->getAsync($request_url);
                                };
                            }
                        };

                        $pool = new Pool($client, $request(), [
                            'concurrency' => count($request_urls),
                            'fulfilled' => function($response, $index) use ($request_urls) {
                                Log::write(Log::logFormat(date('Y-m-d H:i:s').' 请求第 '.$index.' 个请求'.$request_urls[$index].'，返回信息 '.$response->getBody()));
                            },
                            'rejected' => function ($reason, $index){
                                //print_r($reason->getResponse());
                                Log::write(Log::logFormat(date('Y-m-d H:i:s').' 请求第 '.$index.' 个请求，失败原因：'.($reason->getResponse() ? $reason->getResponse()->getBody() : $reason)));
                            },
                            'options' => [
                                'connect_timeout' => 3,
                                'timeout' => 59
                            ]
                        ]);

                        $promise = $pool->promise();
                        $promise->wait();
                    }

                    /*$client = new Client();
                    $time_stamp = time();
                    foreach($jobs as $job)
                    {
                        if($job['status'] == 'off'){ continue; }
                        if( ! $L_job->isTimeUp($job['cron'], $time_stamp)){ continue; }
                        // HTTP GET | POST 方式
                        if(in_array($job['type'], ['http_get', 'http_post']))
                        {
                            $method = ($job['type'] == 'http_get') ? 'GET' : 'POST';
                            //如果支持多线程
                            if(class_exists('Thread')){
                                //(new ThreadRequest($job['command'], $method))->start();
                            }else{
                            //不支持多线程，用异步请求（但会卡住）
                                $client->requestAsync($method, $job['command'])->then(
                                    function (ResponseInterface $res) use ($job) {
                                        Log::write(Log::logFormat($job, $res->getBody()->getContents()));
                                    },
                                    function (RequestException $e) use ($job) {
                                        Log::write($e->getMessage() . "\t" . $e->getRequest()->getMethod()."\n");
                                    }
                                )->wait();
                            }
                        }
                    }*/
                }
            }
        );
        echo "Worker: ".$worker->id."\n";
        echo "Timer Number: ".$timer_id."\n";
    }
};

//当有新连接
$tcp_worker->onConnect = function($connection)
{
    echo "New Connection:".$connection->id."\n";
};

//当接收到某个 connection 的消息
$tcp_worker->onMessage = function($connection, $buffer)
{
    $data = json_decode($buffer, true);
    $connection->send(json_encode(['c'=>0, 'm'=>'pass']));
};

//当某个 connection 连接断开
$tcp_worker->onClose = function($connection)
{
    echo "Connection closed:".$connection->id."\n";
};

//运行
Worker::runAll();
